import com.google.protobuf.MessageLite;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.util.Map;

public class ProtoBufSerializer implements Serializer {

    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, Object data) {
        if(null != data){
            if(data instanceof MessageLite){
                return  ((MessageLite)data).toByteArray();
            }
            if(data instanceof String){
                try {
                    return  String.valueOf(data).getBytes("UTF-8");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return null;
                }
            }
            if(data instanceof byte[]) {
                return (byte[])data;
            }
        }

        return null;
    }

    @Override
    public void close() {

    }
}